package demo

import (
	"fmt"
	"com/models"
)

func StreamDemo(disInstance DISInstance) {
	fmt.Println("--------------------------StreamDemo start---------------------------");
	CreateStream(disInstance)
	UpdateStreamCount(disInstance)
	DescStream(disInstance)
	ListStreams(disInstance)
	//DeleteStream(disInstance)
	fmt.Println("---------------------------StreamDemo end----------------------------");
}

//创建Stream
func CreateStream(disInstance DISInstance) {
	dis := disInstance.Dis
	input := &models.CreateStreamRequest{
		StreamName    :disInstance.StreamName,
		PartitionCount :1,
		DataDuration:24,
		DataType:models.DATA_TYPE_CSV,
		DataSchema:   "{\"type\":\"record\",\"name\":\"RecordName\",\"doc\":\"Schema generated by Kite\",\"fields\":[{\"name\":\"field_0\",\"type\":\"long\",\"doc\":\"Type inferred from '1'\"}]}", }
	result, _ := dis.CreateStream(input)
	if result.Err != nil {
		fmt.Printf("Failed to create Stream [%s].\n", disInstance.StreamName);
	} else {
		fmt.Printf("Success to create Stream [%s].\n", disInstance.StreamName);
	}
}

//扩容
func UpdateStreamCount(disInstance DISInstance) {
	dis := disInstance.Dis
	input := &models.UpdatePartitionCountRequest{
		StreamName    :disInstance.StreamName,
		TargetPartitionCount:2,
	}
	result, _ := dis.UpdatePartitionCount(input)
	if result.Err != nil {
		fmt.Printf("Failed to update Stream count [%s].\n", disInstance.StreamName);
	} else {
		fmt.Printf("Success to update Stream count [%s].\n", disInstance.StreamName);
	}
}

//查询Stream详情
func DescStream(disInstance DISInstance) {
	dis := disInstance.Dis
	input := &models.DescribeStreamRequest{
		StreamName    :disInstance.StreamName,
	}
	result, output := dis.DescStream(input)
	if result.Err != nil {
		fmt.Printf("Failed to desc Stream [%s].\n", disInstance.StreamName);
	} else {
		fmt.Printf("Stream name [%s], Id [%s], createTime [%d].\n", output.StreamName, output.StreamId, output.CreateTime);
	}
}

//查询Stream列表
func ListStreams(disInstance DISInstance) {
	input := &models.ListStreamsRequest{
		Limit:10,
	}
	dis := disInstance.Dis
	result, output := dis.ListStreams(input)
	if result.Err != nil {
		fmt.Printf("Failed to list Stream.\n");
	} else {
		fmt.Printf("ListStreams, size [%d], hasMoreStream [%t].\n", output.StreamNumber, output.HasMoreStreams);
		for i, Stream := range output.StreamInfos {
			fmt.Printf("Stream%d name [%s], PartitionCount [%d], createTime [%d].\n", i, Stream.StreamName, Stream.PartitionCount, Stream.CreateTime);
		}
	}
}

//删除Stream
func DeleteStream(disInstance DISInstance) {
	dis := disInstance.Dis
	input := &models.DeleteStreamRequest{
		StreamName:disInstance.StreamName,
	}
	result, _ := dis.DeleteStream(input)
	if result.Err != nil {
		fmt.Printf("Failed to delete Stream [%s].\n", disInstance.StreamName);
	} else {
		fmt.Printf("Success to delete Stream [%s].\n", disInstance.StreamName);
	}
}
